package windowtest

import Source.SourceTest.SensorReading
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// 每15秒统计一次，窗口内各传感器所有温度的最小值,以及最新的时间戳
object Seconds15window {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//    val inputPath = "E:\\Flinklearn\\Flink\\resources\\data1\\out.txt"
//    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val inputStream: DataStream[String] = env.socketTextStream("master", 7777)

    // 转换成样例类类型
    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val fields = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })

    val ds1: DataStream[(String, Double, Long)] = dataStream.map(data => (data.id, data.temperature, data.timestamp))
      .keyBy(_._1).timeWindow(Time.seconds(15))
      // .min(1)
      .reduce((curRes, newRes) => (curRes._1, curRes._2.min(newRes._2), newRes._3))

    ds1.print()

    env.execute()
  }

}

class MyReduceFunc extends ReduceFunction[SensorReading]{
  override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
  SensorReading(t.id,t1.timestamp,t.temperature.min(t1.temperature))

  }
}
